/*
Author:
Date: Oct. 14, 2003
Purpose: initialize thread pool
*/

#include "tpool.h"

int tpool_init(tpool_t   *tpoolp,
		int       num_worker_threads, 
		int       max_queue_size,
		int       do_not_block_when_full
		)
{
	int	i, rtn;
	tpool_t	tpool;
   
	/* allocate a pool data structure */ 
	if ((tpool = (tpool_t )malloc(sizeof(struct tpool))) == NULL)
  	{
		lprintf(log, FATAL, "Unable to malloc() thread pool!\n");
		return	NULL;
	}
	/* initialize the fileds */
	tpool->num_threads = num_worker_threads;
	tpool->max_queue_size = max_queue_size;
	tpool->do_not_block_when_full = do_not_block_when_full;
  
	if ((tpool->threads = (pthread_t *)malloc(sizeof(pthread_t)*num_worker_threads)) == NULL)
	{
    		lprintf(log, FATAL,"Unable to malloc() thread info array\n");
		return	NULL;
	}
	tpool->cur_queue_size = 0;
	tpool->queue_head = NULL; 
	tpool->queue_tail = NULL;
	tpool->queue_closed = 0;  
	tpool->shutdown = 0; 
	if ((rtn = pthread_mutex_init(&(tpool->queue_lock), NULL)) != 0)
	{
		lprintf(log,FATAL,"pthread_mutex_init %s",strerror(rtn));
		return	NULL;
	}
	if ((rtn = pthread_cond_init(&(tpool->queue_not_empty), NULL)) != 0)
	{
		lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
		return	NULL;
	}
	if ((rtn = pthread_cond_init(&(tpool->queue_not_full), NULL)) != 0)
	{
		lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
		return	NULL;
	}  
	if ((rtn = pthread_cond_init(&(tpool->queue_empty), NULL)) != 0)
	{
		lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
		return	NULL;
	}

	/* create threads */
	for (i = 0; i != num_worker_threads; i++) 
	{
		if ((rtn = pthread_create( &(tpool->threads[i]), NULL, tpool_thread, (void *)tpool)) != 0)
		{
			lprintf(log,FATAL,"pthread_create %s\n",strerror(rtn));
			return	NULL;
		}
	}
	*tpoolp = tpool;
}

int tpool_add_work(
		   tpool_t          tpool,
		   void             (*routine)(),
		   void             *arg)
{
	int	rtn;
	tpool_work_t	*workp;

	if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0)
	{
		lprintf(log,FATAL,"pthread mutex lock %d\n",rtn);
		return	-1;
	}

	/* no space and this caller doesn't want to wait */
	if ((tpool->cur_queue_size == tpool->max_queue_size) && tpool->do_not_block_when_full) 
	{
		if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
		{
			lprintf(log,FATAL,"pthread mutex lock %d\n",rtn);
			return	-1;
		}
		lprintf(log, FATAL,"tpool is no space !!\n");
		return -1;
	}

	while( (tpool->cur_queue_size == tpool->max_queue_size) && (!(tpool->shutdown || tpool->queue_closed))  ) 
	{
		if ((rtn = pthread_cond_wait(&(tpool->queue_not_full), &(tpool->queue_lock))) != 0)
		{
			lprintf(log,FATAL,"pthread cond wait %d\n",rtn);
			return	-1;
		}
	}

	/* the pool is in the process of being destroyed */
	if (tpool->shutdown || tpool->queue_closed) 
	{
		if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
		{
			lprintf(log,FATAL,"pthread mutex lock %d\n",rtn);
			return	-1;
		}
		lprintf(log, FATAL,"tpool is  being destroyed!!\n");
		return -1;
	}


	/* allocate work structure */
	if ((workp = (tpool_work_t *)malloc(sizeof(tpool_work_t))) == NULL)
	{
		lprintf(log,FATAL,"unable to create work struct\n");
		return	-1;
	}
	workp->routine = routine;
	workp->arg = arg;
	workp->next = NULL;

	if (tpool->cur_queue_size == 0) 
	{
		tpool->queue_tail = tpool->queue_head = workp;
		if ((rtn = pthread_cond_signal(&(tpool->queue_not_empty))) != 0)
		{
			lprintf(log,FATAL,"pthread broadcast error %d\n",rtn);
			return -1;
		}
	} 
	else 
	{
		tpool->queue_tail->next = workp;
		tpool->queue_tail = workp;
	}
	tpool->cur_queue_size++; 
	if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
	{
		lprintf(log,FATAL,"pthread mutex lock %d\n",rtn);
		return	-1;
	}
	return	1;
}

int tpool_destroy(tpool_t          tpool,
		  int              finish)
{
	int	i,rtn;
	tpool_work_t	*cur_nodep;
  
	if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0)
	{
		lprintf(log,FATAL,"pthread mutex lock failure\n");
		return -1;
	}

	/* Is a shutdown already in progress? */
	if (tpool->queue_closed || tpool->shutdown) 
	{
		if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
 		{
			lprintf(log,FATAL,"pthread mutex lock failure\n");
			return	-1;
		}
		return;
	}
	tpool->queue_closed = 1;
  	/* If the finish flag is set, wait for workers to  drain queue */ 
	if (finish == 1) 
	{
		while (tpool->cur_queue_size != 0) 
		{
			if ((rtn = pthread_cond_wait(&(tpool->queue_empty), &(tpool->queue_lock))) != 0)
			{
				lprintf(log,FATAL,"pthread_cond_wait %d\n",rtn);
				return	-1;
			}
		}
	}
	tpool->shutdown = 1;
	if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
	{
		lprintf(log,FATAL,"pthread mutex unlock %d\n",rtn);
		return -1;
	}
	/* Wake up any workers so they recheck shutdown flag */
	if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_empty))) != 0)
	{
		lprintf(log,FATAL,"pthread_cond_boradcast %d\n",rtn);
		return -1;
	}
	if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_full))) != 0)
	{
		lprintf(log,FATAL,"pthread_cond_boradcast %d\n",rtn);
		return	-1;
	}

	/* Wait for workers to exit */
	for(i=0; i < tpool->num_threads; i++) 
	{
		if ((rtn = pthread_join(tpool->threads[i],NULL)) != 0)
		{
			lprintf(log,FATAL,"pthread_join %d\n",rtn);
			return	-1;
		}
	}

	/* Now free pool structures */
	free(tpool->threads);
	while(tpool->queue_head != NULL) 
	{
		cur_nodep = tpool->queue_head->next; 
		tpool->queue_head = tpool->queue_head->next;
		free(cur_nodep);
	}
	free(tpool); 
}

void	*tpool_thread(void *arg)
{
	tpool_t tpool = (tpool_t)arg; 
	int	rtn;
	tpool_work_t	*my_workp;
	
	for(;;) 
	{

		/* Check queue for work */ 
		if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0)
		lprintf(log,FATAL,"pthread mutex lock %d\n",rtn);
   
		while ((tpool->cur_queue_size == 0) && (!tpool->shutdown)) 
		{
			if ((rtn = pthread_cond_wait(&(tpool->queue_not_empty), &(tpool->queue_lock))) != 0)
				lprintf(log,FATAL,"pthread_cond_wait %d\n",rtn);  
		}
 
		/* Has a shutdown started while i was sleeping? */
		if (tpool->shutdown == 1) 
		{
			if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0) 
				lprintf(log,FATAL,"pthread mutex unlock %d\n",rtn);
			pthread_exit(NULL);
		}


		/* Get to work, dequeue the next item */ 
		my_workp = tpool->queue_head;
		tpool->cur_queue_size--;
		if (tpool->cur_queue_size == 0)   tpool->queue_head = tpool->queue_tail = NULL;
		else  tpool->queue_head = my_workp->next;
 
 
		/* Handle waiting add_work threads */
		if ((!tpool->do_not_block_when_full) && (tpool->cur_queue_size ==  (tpool->max_queue_size - 1))) 
			if ((rtn = pthread_cond_signal(&(tpool->queue_not_full))) != 0) 
				lprintf(log,FATAL,"pthread cond signal %d\n",rtn);
    
		/* Handle waiting destroyer threads */
		if (tpool->cur_queue_size == 0)
			if ((rtn = pthread_cond_signal(&(tpool->queue_empty))) != 0)
				lprintf(log,FATAL,"pthread cond signal %d\n",rtn);
        
		if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
			lprintf(log,FATAL,"pthread mutex unlock %d\n",rtn);
     
		/* Do this work item */
		(*(my_workp->routine))(my_workp->arg);
		free(my_workp);
	} 
  	return(NULL);            
}
